package com.dtruth.dataflow.rpc;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.drpc.DRPCSpout;
import backtype.storm.drpc.ReturnResults;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public  class ExclaimBolt extends BaseBasicBolt {
    /**
	 * 
	 */
	private static final long serialVersionUID = 1L;

	public void execute(Tuple tuple, BasicOutputCollector collector) {
        String input = tuple.getString(1);
        collector.emit(new Values(tuple.getValue(0), input + "!"));
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("id", "result"));
    }
    
    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        // builder.setSpout(drpcSpout);
        // builder.setBolt(new ExclaimBolt(), 3);
        // submit(builder.createTopology());
        
        LocalDRPC drpc = new LocalDRPC();
        DRPCSpout spout = new DRPCSpout("exclamation", drpc);
        builder.setSpout("drpc", spout);
        builder.setBolt("exclaim", new ExclaimBolt(), 3)
                .shuffleGrouping("drpc");
        builder.setBolt("return", new ReturnResults(), 3)
                .shuffleGrouping("exclaim");

        LocalCluster cluster = new LocalCluster();
        Config conf = new Config();
        cluster.submitTopology("drpc-demo", conf, builder.createTopology());

        // local mode 测试代码
        System.out.println(drpc.execute("exclamation", "hello"));

        cluster.shutdown();
        drpc.shutdown();
    }
}
